/**
 * Copyright (c) 2018, Mr.Wang (recallcode@aliyun.com) All rights reserved.
 */

package cn.recallcode.iot.mqtt.server.broker.protocol;

import cn.recallcode.iot.mqtt.server.broker.internal.InternalCommunication;
import cn.recallcode.iot.mqtt.server.broker.protocol.impl.*;
import cn.recallcode.iot.mqtt.server.common.auth.IAuthService;
import cn.recallcode.iot.mqtt.server.common.message.IDupPubRelMessageStoreService;
import cn.recallcode.iot.mqtt.server.common.message.IDupPublishMessageStoreService;
import cn.recallcode.iot.mqtt.server.common.message.IMessageIdService;
import cn.recallcode.iot.mqtt.server.common.message.IRetainMessageStoreService;
import cn.recallcode.iot.mqtt.server.common.session.ISessionStoreService;
import cn.recallcode.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * 协议处理
 */
@Component
@Slf4j
@Data
public class ProtocolProcess {

	@Autowired
	private ISessionStoreService sessionStoreService;

	@Autowired
	private ISubscribeStoreService subscribeStoreService;

	@Autowired
	private IAuthService authService;

	@Autowired
	private IMessageIdService messageIdService;

	@Autowired
	private IRetainMessageStoreService messageStoreService;

	@Autowired
	private IDupPublishMessageStoreService dupPublishMessageStoreService;

	@Autowired
	private IDupPubRelMessageStoreService dupPubRelMessageStoreService;

	@Autowired
	private InternalCommunication internalCommunication;

	private final Map<MqttMessageType, AbstractMessageProcessor>  processorMap = new HashMap<>(32);

	@PostConstruct
	public void init() {
		processorMap.put(MqttMessageType.CONNECT, new Connect(sessionStoreService, subscribeStoreService, dupPublishMessageStoreService, dupPubRelMessageStoreService, authService));
		processorMap.put(MqttMessageType.UNSUBSCRIBE, new UnSubscribe(subscribeStoreService));
		processorMap.put(MqttMessageType.PUBLISH, new Publish(sessionStoreService, subscribeStoreService, messageIdService, messageStoreService, dupPublishMessageStoreService, internalCommunication));
		processorMap.put(MqttMessageType.DISCONNECT, new DisConnect(sessionStoreService, subscribeStoreService, dupPublishMessageStoreService, dupPubRelMessageStoreService));
		processorMap.put(MqttMessageType.PINGREQ, new PingReq());
		processorMap.put(MqttMessageType.PUBREL, new PubRel());
		processorMap.put(MqttMessageType.PUBACK, new PubAck(messageIdService, dupPublishMessageStoreService));
		processorMap.put(MqttMessageType.PUBREC, new PubRec(dupPublishMessageStoreService, dupPubRelMessageStoreService));
		processorMap.put(MqttMessageType.PUBCOMP, new PubComp(messageIdService, dupPubRelMessageStoreService));
	}

	public void processMsg(Channel channel, MqttMessage msg) {
		MqttMessageType msgType = msg.fixedHeader().messageType();
		AbstractMessageProcessor processor = processorMap.get(msgType);
		if (processor==null) {
			log.warn("unrecognized messageType={}, msg discard={}", msgType, msg);
			return;
		}
		processor.process(channel, msg);
	}

	public void publish(Channel channel, MqttMessage msg) {
		AbstractMessageProcessor processor = processorMap.get(MqttMessageType.PUBLISH);
		if (processor==null) {
			log.warn("faild to find PUBLISH messageProcessor for channel={}, msg={}", channel, msg);
			return;
		}
		processor.process(channel, msg);
	}
}
